/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.internal.placementdriver;

import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager.configureCmgManagerToStartMetastorage;
import static org.apache.ignite.internal.placementdriver.PlacementDriverManager.PLACEMENTDRIVER_LEASES_KEY;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
import org.apache.ignite.internal.components.SystemPropertiesNodeProperties;
import org.apache.ignite.internal.configuration.ComponentWorkingDir;
import org.apache.ignite.internal.configuration.RaftGroupOptionsConfigHelper;
import org.apache.ignite.internal.configuration.SystemDistributedConfiguration;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.hlc.TestClockService;
import org.apache.ignite.internal.lang.IgniteTriFunction;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
import org.apache.ignite.internal.metastorage.impl.MetaStorageServiceImpl;
import org.apache.ignite.internal.metastorage.server.ReadOperationForCompactionTracker;
import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
import org.apache.ignite.internal.metrics.MetricManager;
import org.apache.ignite.internal.metrics.NoOpMetricManager;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.NetworkMessageHandler;
import org.apache.ignite.internal.network.StaticNodeFinder;
import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
import org.apache.ignite.internal.placementdriver.PlacementDriverManagerTest.LogicalTopologyServiceTestImpl;
import org.apache.ignite.internal.placementdriver.leases.Lease;
import org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessage;
import org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessageResponse;
import org.apache.ignite.internal.placementdriver.message.PlacementDriverMessageGroup;
import org.apache.ignite.internal.placementdriver.message.PlacementDriverMessagesFactory;
import org.apache.ignite.internal.placementdriver.message.PlacementDriverReplicaMessage;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.RaftGroupOptionsConfigurer;
import org.apache.ignite.internal.raft.TestLozaFactory;
import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.raft.storage.LogStorageFactory;
import org.apache.ignite.internal.raft.util.SharedLogStorageFactoryUtils;
import org.apache.ignite.internal.replicator.PartitionGroupId;
import org.apache.ignite.internal.replicator.configuration.ReplicationConfiguration;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;

/**
 * There are tests of muti-nodes for placement driver.
 */
@ExtendWith(ConfigurationExtension.class)
public class MultiActorPlacementDriverTest extends BasePlacementDriverTest {
    public static final int BASE_PORT = 1234;

    private static final PlacementDriverMessagesFactory PLACEMENT_DRIVER_MESSAGES_FACTORY = new PlacementDriverMessagesFactory();

    @InjectConfiguration
    private RaftConfiguration raftConfiguration;

    @InjectConfiguration
    private SystemDistributedConfiguration systemDistributedConfiguration;

    @InjectConfiguration
    private ReplicationConfiguration replicationConfiguration;

    private List<String> placementDriverNodeNames;

    private List<String> nodeNames;

    private List<AutoCloseable> servicesToClose;

    /** The manager is used to read a data from Meta storage in the tests. */
    private MetaStorageManagerImpl metaStorageManager;

    /** Cluster service by node name. */
    private Map<String, ClusterService> clusterServices;

    /** This closure handles {@link LeaseGrantedMessage} to check the placement driver manager behavior. */
    private IgniteTriFunction<LeaseGrantedMessage, String, String, LeaseGrantedMessageResponse> leaseGrantHandler;

    private final AtomicInteger nextTableId = new AtomicInteger(1);

    private final long assignmentsTimestamp = new HybridTimestamp(0, 1).longValue();

    @BeforeEach
    public void beforeTest(TestInfo testInfo) {
        this.placementDriverNodeNames = IntStream.range(BASE_PORT, BASE_PORT + 3).mapToObj(port -> testNodeName(testInfo, port))
                .collect(Collectors.toList());
        this.nodeNames = IntStream.range(BASE_PORT, BASE_PORT + 5).mapToObj(port -> testNodeName(testInfo, port))
                .collect(Collectors.toList());

        this.clusterServices = startNodes();

        List<LogicalTopologyServiceTestImpl> logicalTopManagers = new ArrayList<>();

        servicesToClose = (List<AutoCloseable>) startPlacementDriver(clusterServices, logicalTopManagers, workDir);

        for (String nodeName : nodeNames) {
            if (!placementDriverNodeNames.contains(nodeName)) {
                var service = clusterServices.get(nodeName);

                assertThat(service.startAsync(new ComponentContext()), willCompleteSuccessfully());

                servicesToClose.add(() -> {
                    service.beforeNodeStop();

                    assertThat(service.stopAsync(new ComponentContext()), willCompleteSuccessfully());
                });
            }
        }

        logicalTopManagers.forEach(LogicalTopologyServiceTestImpl::updateTopology);
    }

    @AfterEach
    public void afterTest() throws Exception {
        IgniteUtils.closeAll(servicesToClose);
    }

    /**
     * Handles a lease grant message.
     *
     * @param handlerService Node service which will handles the message.
     * @return Response message.
     */
    private NetworkMessageHandler leaseGrantMessageHandler(ClusterService handlerService) {
        return (msg, sender, correlationId) -> {
            if (!(msg instanceof PlacementDriverReplicaMessage)) {
                return;
            }

            var handlerNode = handlerService.topologyService().localMember();

            log.info("Lease is being granted [actor={}, recipient={}, force={}]", sender, handlerNode.name(),
                    ((LeaseGrantedMessage) msg).force());

            LeaseGrantedMessageResponse resp = null;

            if (leaseGrantHandler != null) {
                resp = leaseGrantHandler.apply((LeaseGrantedMessage) msg, sender.name(), handlerNode.name());
            }

            if (resp == null) {
                resp = PLACEMENT_DRIVER_MESSAGES_FACTORY.leaseGrantedMessageResponse()
                        .accepted(true)
                        .build();
            }

            handlerService.messagingService().respond(sender, resp, correlationId);
        };
    }

    /**
     * Starts cluster nodes.
     *
     * @return Cluster services.
     */
    public Map<String, ClusterService> startNodes() {
        var res = new HashMap<String, ClusterService>(nodeNames.size());

        var nodeFinder = new StaticNodeFinder(IntStream.range(BASE_PORT, BASE_PORT + 5)
                .mapToObj(p -> new NetworkAddress("localhost", p))
                .collect(Collectors.toList()));

        int port = BASE_PORT;

        for (String nodeName : nodeNames) {
            var srvc = ClusterServiceTestUtils.clusterService(nodeName, port++, nodeFinder);

            srvc.messagingService().addMessageHandler(PlacementDriverMessageGroup.class, leaseGrantMessageHandler(srvc));

            res.put(nodeName, srvc);
        }

        return res;
    }

    /**
     * Starts placement driver.
     *
     * @param services Cluster services.
     * @param logicalTopManagers The list to update in the method. The list might be used for driving of the logical topology.
     * @return List of closures to stop the services.
     */
    private List<? extends AutoCloseable> startPlacementDriver(
            Map<String, ClusterService> services,
            List<LogicalTopologyServiceTestImpl> logicalTopManagers,
            Path workDir
    ) {
        var res = new ArrayList<Node>(placementDriverNodeNames.size());

        for (int i = 0; i < placementDriverNodeNames.size(); i++) {
            String nodeName = placementDriverNodeNames.get(i);
            var clusterService = services.get(nodeName);

            ClusterManagementGroupManager cmgManager = mock(ClusterManagementGroupManager.class);

            Set<String> metaStorageNodes = Set.copyOf(placementDriverNodeNames);
            when(cmgManager.metaStorageNodes()).thenReturn(completedFuture(metaStorageNodes));
            when(cmgManager.metaStorageInfo()).thenReturn(completedFuture(
                    new CmgMessagesFactory().metaStorageInfo().metaStorageNodes(metaStorageNodes).build()
            ));
            configureCmgManagerToStartMetastorage(cmgManager);

            RaftGroupEventsClientListener eventsClientListener = new RaftGroupEventsClientListener();

            var logicalTopologyService = new LogicalTopologyServiceTestImpl(clusterService);

            logicalTopManagers.add(logicalTopologyService);

            TopologyAwareRaftGroupServiceFactory topologyAwareRaftGroupServiceFactory = new TopologyAwareRaftGroupServiceFactory(
                    clusterService,
                    logicalTopologyService,
                    Loza.FACTORY,
                    eventsClientListener
            );

            HybridClock nodeClock = new HybridClockImpl();

            ComponentWorkingDir workingDir = new ComponentWorkingDir(workDir.resolve(nodeName + "_loza"));

            LogStorageFactory partitionsLogStorageFactory = SharedLogStorageFactoryUtils.create(
                    clusterService.nodeName(),
                    workingDir.raftLogPath()
            );

            var raftManager = TestLozaFactory.create(
                    clusterService,
                    raftConfiguration,
                    nodeClock,
                    eventsClientListener
            );

            var readOperationForCompactionTracker = new ReadOperationForCompactionTracker();

            var storage = new SimpleInMemoryKeyValueStorage(nodeName, readOperationForCompactionTracker);

            ClockService clockService = new TestClockService(nodeClock);

            ComponentWorkingDir metastorageWorkDir = new ComponentWorkingDir(workDir.resolve(nodeName + "_metastorage"));

            LogStorageFactory msLogStorageFactory =
                    SharedLogStorageFactoryUtils.create(clusterService.nodeName(), metastorageWorkDir.raftLogPath());

            RaftGroupOptionsConfigurer msRaftConfigurer =
                    RaftGroupOptionsConfigHelper.configureProperties(msLogStorageFactory, metastorageWorkDir.metaPath());

            var metaStorageManager = new MetaStorageManagerImpl(
                    clusterService,
                    cmgManager,
                    logicalTopologyService,
                    raftManager,
                    storage,
                    nodeClock,
                    topologyAwareRaftGroupServiceFactory,
                    new NoOpMetricManager(),
                    systemDistributedConfiguration,
                    msRaftConfigurer,
                    readOperationForCompactionTracker
            );

            if (this.metaStorageManager == null) {
                this.metaStorageManager = metaStorageManager;
            }

            var placementDriverManager = new PlacementDriverManager(
                    nodeName,
                    metaStorageManager,
                    MetastorageGroupId.INSTANCE,
                    clusterService,
                    cmgManager::metaStorageNodes,
                    logicalTopologyService,
                    raftManager,
                    topologyAwareRaftGroupServiceFactory,
                    clockService,
                    mock(FailureProcessor.class),
                    new SystemPropertiesNodeProperties(),
                    replicationConfiguration,
                    Runnable::run,
                    mock(MetricManager.class),
                    zoneId -> completedFuture(Set.of()),
                    zoneId -> null
            );

            res.add(new Node(
                    nodeName,
                    clusterService,
                    raftManager,
                    partitionsLogStorageFactory,
                    msLogStorageFactory,
                    metaStorageManager,
                    placementDriverManager
            ));
        }

        assertThat(allOf(res.stream().map(Node::startAsync).toArray(CompletableFuture[]::new)), willCompleteSuccessfully());

        return res;
    }

    @Test
    public void testLeaseCreate() throws Exception {
        PartitionGroupId grpPart0 = createTableAssignment();

        checkLeaseCreated(grpPart0, true);
    }

    @Test
    public void testLeaseProlong() throws Exception {
        var acceptedNodeRef = new AtomicReference<String>();

        leaseGrantHandler = (msg, from, to) -> {
            acceptedNodeRef.compareAndSet(null, to);

            return PLACEMENT_DRIVER_MESSAGES_FACTORY.leaseGrantedMessageResponse()
                    .accepted(true)
                    .build();
        };

        PartitionGroupId grpPart0 = createTableAssignment();

        Lease lease = checkLeaseCreated(grpPart0, true);
        Lease leaseRenew = waitForProlong(grpPart0, lease);

        assertEquals(acceptedNodeRef.get(), leaseRenew.getLeaseholder());
    }

    @Test
    public void prolongAfterActiveActorChanged() throws Exception {
        var acceptedNodeRef = new AtomicReference<String>();

        leaseGrantHandler = (msg, from, to) -> {
            acceptedNodeRef.compareAndSet(null, to);

            return PLACEMENT_DRIVER_MESSAGES_FACTORY.leaseGrantedMessageResponse()
                    .accepted(true)
                    .build();
        };

        PartitionGroupId grpPart0 = createTableAssignment();

        Lease lease = checkLeaseCreated(grpPart0, true);

        CompletableFuture<RaftGroupService> msRaftClientFuture = metaStorageManager.metaStorageService()
                .thenApply(MetaStorageServiceImpl::raftGroupService);

        assertThat(msRaftClientFuture, willCompleteSuccessfully());

        RaftGroupService msRaftClient = msRaftClientFuture.join();

        assertThat(msRaftClient.refreshLeader(), willCompleteSuccessfully());

        Peer previousLeader = msRaftClient.leader();

        Peer newLeader = msRaftClient.peers().stream().filter(peer -> !peer.equals(previousLeader)).findAny().get();

        log.info("The placement driver group active actor is transferring [from={}, to={}]", previousLeader, newLeader);

        assertThat(msRaftClient.transferLeadership(newLeader), willCompleteSuccessfully());

        waitForProlong(grpPart0, lease);

        assertEquals(newLeader, msRaftClient.leader());
    }

    @Test
    public void testLeaseProlongAfterRedirect() throws Exception {
        var declinedNodeRef = new AtomicReference<String>();
        var acceptedNodeRef = new AtomicReference<String>();

        leaseGrantHandler = (msg, from, to) -> {
            if (declinedNodeRef.compareAndSet(null, to)) {
                var redirectNode = nodeNames.stream().filter(nodeName -> !nodeName.equals(to)).findAny().get();

                acceptedNodeRef.compareAndSet(null, redirectNode);

                log.info("Leaseholder candidate proposes other node to hold the lease [candidate={}, proposal={}]", to, redirectNode);

                return PLACEMENT_DRIVER_MESSAGES_FACTORY.leaseGrantedMessageResponse()
                        .accepted(false)
                        .redirectProposal(redirectNode)
                        .build();
            } else {
                log.info("Lease is accepted [leaseholder={}]", to);

                return PLACEMENT_DRIVER_MESSAGES_FACTORY.leaseGrantedMessageResponse()
                        .accepted(true)
                        .build();
            }
        };

        PartitionGroupId grpPart0 = createTableAssignment();

        Lease lease = checkLeaseCreated(grpPart0, true);

        assertEquals(lease.getLeaseholder(), acceptedNodeRef.get());

        waitForProlong(grpPart0, lease);
    }

    @Test
    public void testDeclineLeaseByLeaseholder() throws Exception {
        var acceptedNodeRef = new AtomicReference<String>();
        var activeActorRef = new AtomicReference<String>();

        leaseGrantHandler = (msg, from, to) -> {
            acceptedNodeRef.set(to);
            activeActorRef.set(from);

            return PLACEMENT_DRIVER_MESSAGES_FACTORY.leaseGrantedMessageResponse()
                    .accepted(true)
                    .build();
        };

        PartitionGroupId grpPart = createTableAssignment();

        Lease lease = checkLeaseCreated(grpPart, true);

        lease = waitForProlong(grpPart, lease);

        assertEquals(acceptedNodeRef.get(), lease.getLeaseholder());
        assertTrue(lease.isAccepted());

        var service = clusterServices.get(acceptedNodeRef.get());

        leaseGrantHandler = (msg, from, to) -> {
            if (acceptedNodeRef.get().equals(to)) {
                var redirectNode = nodeNames.stream().filter(nodeName -> !nodeName.equals(to)).findAny().get();

                return PLACEMENT_DRIVER_MESSAGES_FACTORY.leaseGrantedMessageResponse()
                        .redirectProposal(redirectNode)
                        .accepted(false)
                        .build();
            } else {
                return PLACEMENT_DRIVER_MESSAGES_FACTORY.leaseGrantedMessageResponse()
                        .accepted(true)
                        .build();
            }
        };

        final Lease fLease = lease;
        String proposedLeaseholder = nodeNames.stream().filter(n -> !n.equals(fLease.getLeaseholder())).findAny().orElseThrow();

        service.messagingService().send(
                clusterServices.get(activeActorRef.get()).topologyService().localMember(),
                PLACEMENT_DRIVER_MESSAGES_FACTORY.stopLeaseProlongationMessage()
                        .groupId(grpPart)
                        .redirectProposal(proposedLeaseholder)
                        .build()
        );

        Lease leaseRenew = waitNewLeaseholder(grpPart, lease);

        log.info("Lease move from {} to {}", lease.getLeaseholder(), leaseRenew.getLeaseholder());

        assertEquals(proposedLeaseholder, leaseRenew.getLeaseholder());
    }

    /**
     * Waits for a new leaseholder.
     *
     * @param grpPart Replication group id.
     * @param lease Previous lease.
     * @return Renewed lease.
     * @throws InterruptedException If the waiting is interrupted.
     */
    private Lease waitNewLeaseholder(PartitionGroupId grpPart, Lease lease) throws InterruptedException {
        var leaseRenewRef = new AtomicReference<Lease>();

        assertTrue(waitForCondition(() -> {
            var fut = metaStorageManager.get(PLACEMENTDRIVER_LEASES_KEY);

            Lease leaseRenew = leaseFromBytes(fut.join().value(), grpPart);

            if (lease == null) {
                return false;
            }

            if (!lease.getLeaseholder().equals(leaseRenew.getLeaseholder())) {
                leaseRenewRef.set(leaseRenew);

                return true;
            }

            return false;
        }, 10_000));

        assertTrue(lease.getExpirationTime().compareTo(leaseRenewRef.get().getStartTime()) < 0);

        return leaseRenewRef.get();
    }

    /**
     * Waits for a lease prolong.
     *
     * @param grpPart Replication group id.
     * @param lease Lease which waits for prolong.
     * @return Renewed lease.
     * @throws InterruptedException If the waiting is interrupted.
     */
    private Lease waitForProlong(PartitionGroupId grpPart, Lease lease) throws InterruptedException {
        var leaseRenewRef = new AtomicReference<Lease>();

        assertTrue(waitForCondition(() -> {
            if (lease == null) {
                return false;
            }

            CompletableFuture<Entry> msFur = metaStorageManager.get(PLACEMENTDRIVER_LEASES_KEY).exceptionally(ex -> {
                log.info("Meta storage is unavailable", ex);

                return null;
            });

            assertThat(msFur, willCompleteSuccessfully());

            if (msFur.join() == null) {
                return false;
            }

            Lease leaseRenew = leaseFromBytes(msFur.join().value(), grpPart);

            if (lease.getExpirationTime().compareTo(leaseRenew.getExpirationTime()) < 0) {
                leaseRenewRef.set(leaseRenew);

                return true;
            }

            return false;
        }, 10_000));

        assertEquals(lease.getLeaseholder(), leaseRenewRef.get().getLeaseholder());
        assertEquals(lease.getStartTime(), leaseRenewRef.get().getStartTime());

        return leaseRenewRef.get();
    }

    /**
     * Checks if a group lease is created during the timeout.
     *
     * @param grpPartId Replication group id.
     * @param waitAccept Await a lease with the accepted flag.
     * @return A lease that is read from Meta storage.
     * @throws InterruptedException If the waiting is interrupted.
     */
    private Lease checkLeaseCreated(PartitionGroupId grpPartId, boolean waitAccept) throws InterruptedException {
        AtomicReference<Lease> leaseRef = new AtomicReference<>();

        assertTrue(waitForCondition(() -> {
            var leaseFut = metaStorageManager.get(PLACEMENTDRIVER_LEASES_KEY);

            var leaseEntry = leaseFut.join();

            if (leaseEntry != null && !leaseEntry.empty()) {
                Lease lease = leaseFromBytes(leaseEntry.value(), grpPartId);

                if (lease == null) {
                    return false;
                }

                if (!waitAccept) {
                    leaseRef.set(lease);
                } else if (lease.isAccepted()) {
                    leaseRef.set(lease);
                }
            }

            return leaseRef.get() != null;
        }, 10_000));

        return leaseRef.get();
    }

    /**
     * Creates an assignment for the fake table.
     *
     * @return Replication group id.
     */
    private PartitionGroupId createTableAssignment() {
        return createAssignments(metaStorageManager, nextTableId.get(), nodeNames, assignmentsTimestamp);
    }
}
